-- Tags: no-parallel, no-fasttest, no-shared-merge-tree
-- Tag no-parallel - due to static databases
-- Tag no-fasttest - S3 is required
-- Tag no-shared-merge-tree - no reliable way to make SMT read-only in stateless test

drop database if exists shard_0;
drop database if exists shard_1;
create database shard_0;
create database shard_1;

drop table if exists dist;
drop table if exists dist_batch;

-- normal replica
create table shard_0.data (key Int) engine=ReplicatedMergeTree('/tables/{database}/data', 'write') order by key;
-- create read-only replica
create table shard_1.data (key Int) engine=ReplicatedMergeTree('/tables/{database}/data', 'read') order by key;

-- broke replica and leave it read-only
detach table shard_1.data;
insert into system.zookeeper (path, name, value) values ('/tables/shard_1/data/replicas/read', 'columns', '');
attach table shard_1.data;
select database||'.'||table, is_readonly from system.replicas where database in ('shard_0', 'shard_1') and table = 'data';

create table dist as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', '', 'data');
system stop distributed sends dist;

create table dist_batch as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', '', 'data') settings background_insert_batch=1;
system stop distributed sends dist_batch;

create table dist_single_no_internal_replication_read_only as shard_0.data engine=Distributed('test_shard_localhost', 'shard_1', 'data');
system stop distributed sends dist_single_no_internal_replication_read_only;

create table dist_single as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', 'shard_1', 'data');
system stop distributed sends dist_single;

set prefer_localhost_replica=0;
set insert_deduplicate=0;
-- replica is readonly, avoid too many retries
set insert_keeper_max_retries=3;
-- and disable the fault injection to avoid failures
set insert_keeper_fault_injection_probability=0;

-- for internal_replication==false, distributed_insert_skip_read_only_replicas does not changes anything
-- because in case of internal_replication=false it cannot skip any replicas, since it need to duplicate the data to all replicas on the initiator
insert into dist_single_no_internal_replication_read_only settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY }
insert into dist_single_no_internal_replication_read_only settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError TABLE_IS_READ_ONLY }

-- for internal_replication==true, it does
insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY }
insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError ALL_CONNECTION_TRIES_FAILED }

-- min_insert_block_size_rows is not enough, since replica will be selected
-- before, so we need to perform INSERT multiple times to ensure that at least
-- once read-only replica should be selected (if it wasn't not filtered out)
{% for i in range(1, 30) %}
insert into dist settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1, load_balancing='round_robin' values ({{ i }});
{% endfor %}
-- cannot check for background inserts, so only here
system flush logs;
select
    'DistributedConnectionSkipReadOnlyReplica', count() > 0, sum(ProfileEvents['DistributedConnectionSkipReadOnlyReplica']) > 0
from system.query_log
where
    event_date >= yesterday()
    and current_database = currentDatabase()
    and query_kind = 'Insert'
    and type != 'QueryStart'
    and Settings['distributed_foreground_insert'] = '1';

{% for i in range(1, 30) %}
insert into dist settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }});
system flush distributed dist;
{% endfor %}

{% for i in range(1, 30) %}
insert into dist_batch settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }});
system flush distributed dist_batch;
{% endfor %}

drop database shard_0;
drop database shard_1;

-- vim: ft=sql
